package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Flink SQL - Group Windows
 * 滚动窗口 - TUMBLE(time_attr, interval)
 */
public class Flink06_FlinkSQL_GroupWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("SELECT " +
                " id, " +
                " TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +
                " TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +
                " SUM(vc) sum_vc " +
                " from sensor " +
                " GROUP BY TUMBLE(t, INTERVAL '1' minute), id ")
                .execute()
                .print();

    }
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+-------------+-------------------------+-------------------------+-------------+
| op |          id |                  wStart |                    wEnd |      sum_vc |
+----+-------------+-------------------------+-------------------------+-------------+
| +I |    sensor_1 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         430 |
| +I |    sensor_2 | 1970-01-01 08:00:00.000 | 1970-01-01 08:01:00.000 |         140 |
+----+-------------+-------------------------+-------------------------+-------------+
 */
